package sf.r2dbc.sql;

import io.r2dbc.spi.Connection;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Row;
import io.r2dbc.spi.RowMetadata;
import io.r2dbc.spi.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import sf.common.exception.SmallOrmException;
import sf.common.log.LogContext;
import sf.common.log.OrmLog;
import sf.common.wrapper.Page;
import sf.core.DBCascadeField;
import sf.core.DBField;
import sf.core.DBObject;
import sf.database.dao.DBContext;
import sf.database.dialect.DBDialect;
import sf.database.dialect.DBProperty;
import sf.database.jdbc.sql.CrudModelImpl;
import sf.database.jdbc.sql.ModelSql;
import sf.database.jdbc.sql.PageStrategy;
import sf.database.jdbc.sql.SQLContext;
import sf.database.jdbc.sql.SQLParameter;
import sf.database.listener.EntityListenerManager;
import sf.database.meta.CascadeConfig;
import sf.database.meta.CascadeContext;
import sf.database.meta.CascadeUtils;
import sf.database.meta.ColumnMapping;
import sf.database.meta.MetaHolder;
import sf.database.meta.TableMapping;
import sf.database.support.DMLType;
import sf.database.util.DBUtils;
import sf.database.util.OrmUtils;
import sf.database.util.OrmValueUtils;
import sf.dsl.example.SelectOpt;
import sf.r2dbc.binding.BindMarker;
import sf.r2dbc.binding.BindMarkersFactory;
import sf.r2dbc.handle.R2dbcPageListHandler;
import sf.r2dbc.rowmapper.R2dbcMapRowMapper;
import sf.r2dbc.rowmapper.R2dbcRowMapper;
import sf.r2dbc.rowmapper.R2dbcRowMapperHelp;
import sf.spring.util.Assert;
import sf.spring.util.CollectionUtils;
import sf.tools.JavaTypeUtils;
import sf.tools.NumberUtils;
import sf.tools.StringUtils;

import javax.persistence.GenerationType;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.BiFunction;

public class R2dbcCrudModelImpl implements R2dbcCrudModelInf {
    private static final Logger logger = LoggerFactory.getLogger(R2dbcCrudModelImpl.class);

    private static R2dbcCrudModelImpl instance = new R2dbcCrudModelImpl();

    private R2dbcCrudModelImpl() {

    }

    public static R2dbcCrudModelImpl getInstance() {
        return instance;
    }

    @Override
    public <T extends DBObject> Mono<Long> selectCount(Connection connection, T query) {
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        SQLContext sqlContext = ModelSql.getQuerySelect(dbContext, dialect, query, false, true);
        return selectOne(connection, Long.class, sqlContext).defaultIfEmpty(0L);
    }

    @Override
    public <T extends DBObject> Mono<Boolean> existsByPrimaryKeys(Connection conn, Class<T> clz, Object... keyParams) {
        TableMapping table = MetaHolder.getMeta(clz);
        List<ColumnMapping> cmList = table.getPkFields();
        if (cmList.size() != keyParams.length) {
            throw new SmallOrmException("主键参数不对");
        }
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        SQLContext context = ModelSql.forModelExistsByIds(dbContext, table, dialect, keyParams);
        return selectOne(conn, Byte.class, context).hasElement();
    }

    @Override
    public <T extends DBObject> Mono<Boolean> exists(Connection connection, T query) {
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        SQLContext sqlContext = ModelSql.getQuerySelect(dbContext, dialect, query, false, SelectOpt.exists, false);
        String sql = dialect.sqlPageList(new StringBuilder(sqlContext.getSql()), 0, 1).toString();
        sqlContext.setSql(sql);
        return selectOne(connection, Byte.class, sqlContext).hasElement();
    }

    @Override
    public <T extends DBObject> Flux<T> selectList(Connection connection, T query) {
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        SQLContext select = ModelSql.getQuerySelect(dbContext, dialect, query, false, false);
        Class<T> clz = (Class<T>) query.getClass();
        return selectList(connection, clz, select);
    }

    @Override
    public <T extends DBObject> Flux<T> selectList(Connection conn, T query, long start, int limit) {
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext context = R2dbcUtils.getDBContext(conn);
        SQLContext select = ModelSql.getQuerySelect(context, dialect, query, false, null, false);
        Class<T> clz = (Class<T>) query.getClass();
        return selectList(conn, clz, select, start, limit);
    }

    @Override
    public <T extends DBObject> Flux<T> selectListForUpdate(Connection conn, T query) {
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext context = R2dbcUtils.getDBContext(conn);
        SQLContext select = ModelSql.getQuerySelect(context, dialect, query, true, false);
        Class<T> clz = (Class<T>) query.getClass();
        return selectList(conn, clz, select);
    }

    @Override
    public <T extends DBObject> Mono<T> selectByPrimaryKeys(Connection connection, Class<T> clz, Object... keyParams) {
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        TableMapping table = MetaHolder.getMeta(clz);
        List<ColumnMapping> cmList = table.getPkFields();
        if (cmList.size() != keyParams.length) {
            throw new SmallOrmException("主键参数不对");
        }
        SQLContext context = ModelSql.forModelSelectByIds(dbContext, table, dialect, keyParams);
        R2dbcRowMapper<T> rowMapper = R2dbcRowMapperHelp.getBeanRowMapper(clz, dbContext);
        return select(connection, rowMapper, context).next();
    }

    /**
     * @param conn
     * @param context
     * @return
     * @see org.springframework.r2dbc.core.ColumnMapRowMapper
     */
    @Override
    public Flux<Map<String, Object>> selectListMap(Connection conn, SQLContext context) {
        return select(conn, R2dbcMapRowMapper.INSTANCE, context);
    }

    @Override
    public <T extends DBObject> Mono<T> selectOne(Connection connection, T query) {
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        SQLContext select = ModelSql.getQuerySelect(dbContext, dialect, query, false, false);
        Class<T> clz = (Class<T>) query.getClass();
        return selectOne(connection, clz, select);
    }

    @Override
    public <T> Mono<T> selectOne(Connection connection, Class<T> beanClass, SQLContext context) {
        Assert.notNull(context, "context is null.");
        Assert.notNull(beanClass, "beanClass is null.");
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        R2dbcRowMapper<T> rsh = R2dbcRowMapperHelp.getRowMapper(beanClass, dbContext);
        return select(connection, rsh, context).next();
    }

    @Override
    public <T extends DBObject> Mono<T> selectOneForUpdate(Connection connection, T query) {
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        SQLContext select = ModelSql.getQuerySelect(dbContext, dialect, query, true, false);
        Class<T> clz = (Class<T>) query.getClass();
        return selectOne(connection, clz, select);
    }

    public <T> Flux<T> selectList(Connection connection, Class<T> beanClass, SQLContext context) {
        Assert.notNull(context, "context is null.");
        Assert.notNull(beanClass, "beanClass is null.");
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        R2dbcRowMapper<T> rsh = R2dbcRowMapperHelp.getRowMapper(beanClass, dbContext);
        return select(connection, rsh, context);
    }

    @Override
    public <T> Flux<T> selectList(Connection conn, Class<T> beanClass, SQLContext context, long start, int limit) {
        Assert.notNull(context, "context is null.");
        Assert.notNull(beanClass, "beanClass is null.");
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        String sql = dialect.sqlPageList(new StringBuilder(context.getSql()), start, limit).toString();
        context.setSql(sql);
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        R2dbcRowMapper<T> rsh = R2dbcRowMapperHelp.getRowMapper(beanClass, dbContext);
        return select(conn, rsh, context);
    }

    @Override
    public <T extends DBObject> Mono<Page<T>> selectPage(Connection conn, T query, long start, int limit) {
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext context = R2dbcUtils.getDBContext(conn);
        SQLContext select = ModelSql.getQuerySelect(context, dialect, query, false, SelectOpt.page, false);
        Class<T> clz = (Class<T>) query.getClass();
        return selectPage(conn, start, limit, clz, select);
    }

    /**
     * @param conn
     * @param start     0开始
     * @param limit     限制多少条数据
     * @param beanClass
     * @return
     */
    @Override
    public <T> Mono<Page<T>> selectPage(Connection conn, long start, int limit, Class<T> beanClass, SQLContext context) {
        Assert.notNull(beanClass, "beanClass is null.");
        String sql = context.getSql();
        String countSql = context.getCountSql();
        String listSql = context.getListSql();
        if (StringUtils.isBlank(countSql)) {
            countSql = DBUtils.getSmartSqlSelectCount(sql);
        }
        if (StringUtils.isBlank(listSql)) {
            listSql = sql;
        }
        String listSqlRef = listSql;
        //设置为查询总数的sql
        context.setSql(countSql);
        return selectOne(conn, Long.class, context).defaultIfEmpty(0L).flatMap(count -> {
            Mono<List<T>> items = Mono.just(Collections.emptyList());
            if (count > 0) {
                String pageSql = R2dbcUtils.getDBDialect(conn).sqlPageList(new StringBuilder(listSqlRef), start, limit).toString();
                if (StringUtils.isBlank(pageSql)) {
                    DBContext dbContext = R2dbcUtils.getDBContext(conn);
                    R2dbcRowMapper<T> rowMapper = R2dbcRowMapperHelp.getRowMapper(beanClass, dbContext);
                    R2dbcPageListHandler<T> rsh = new R2dbcPageListHandler<T>(rowMapper);
                    rsh.setFirstResult((int) start);// 如果不支持分页，那么使用原始的分页方法 ResultSet.absolute(first)
                    rsh.setMaxResults(limit);
                    context.setSql(listSqlRef);
                    items = select(conn, rsh, context).collectList();
                } else {
                    // 使用数据库自身的分页SQL语句，将直接返回某一个
                    context.setSql(pageSql);
                    items = selectList(conn, beanClass, context).collectList();
                }
            }
            return items.map(it -> {
                Page<T> page = new Page<>(start, limit, count);
                page.setList(it);
                return page;
            });
        });
    }

    @Override
    public <T> Mono<Page<T>> selectPage(Connection conn, long start, int limit, Class<T> beanClass, SQLContext context, PageStrategy strategy) {
        Assert.notNull(beanClass, "beanClass is null.");
        String sql = context.getSql();
        String countSql = context.getCountSql();
        String listSql = context.getListSql();
        if (StringUtils.isBlank(countSql)) {
            countSql = DBUtils.getSmartSqlSelectCount(sql);
        }
        if (StringUtils.isBlank(listSql)) {
            listSql = sql;
        }
        String listSqlRef = listSql;
        //设置为查询总数的sql
        context.setSql(countSql);
        return selectOne(conn, Long.class, context).defaultIfEmpty(0L).flatMap(count -> {
            Mono<List<T>> items = Mono.just(Collections.emptyList());
            if (count > 0) {
                //改为原来的
                context.setSql(sql);
                switch (strategy) {
                    case onlyList://只查询列表,不包含分页参数.
                        String pageSql = R2dbcUtils.getDBDialect(conn).sqlPageList(new StringBuilder(listSqlRef), start, limit).toString();
                        if (StringUtils.isNotBlank(pageSql)) {
                            // 使用数据库自身的分页SQL语句，将直接返回某一个
                            context.setSql(pageSql);
                            items = selectList(conn, beanClass, context).collectList();
                        } else {
                            throw new SmallOrmException("not find page sql,please check you sql");
                        }
                        break;
                    case fake:
                        DBContext dbContext = R2dbcUtils.getDBContext(conn);
                        R2dbcRowMapper<T> rowMapper = R2dbcRowMapperHelp.getRowMapper(beanClass, dbContext);
                        R2dbcPageListHandler<T> rsh = new R2dbcPageListHandler<T>(rowMapper);
                        rsh.setFirstResult((int) start);// 如果不支持分页，那么使用原始的分页方法 ResultSet.absolute(first)
                        rsh.setMaxResults(limit);
                        context.setSql(listSqlRef);
                        items = select(conn, rsh, context).collectList();
                        break;
                    case hasOffsetLimit:
                        // 查询语句已包含offset,limit 使用数据库自身的分页SQL语句，将直接返回某一个
                        context.setSql(listSqlRef);
                        items = selectList(conn, beanClass, context).collectList();
                        break;
                    default:
                        break;
                }
            }
            return items.map(it -> {
                Page<T> page = new Page<>(start, limit, count);
                page.setList(it);
                return page;
            });
        });
    }

    protected static <T> Flux<T> select(Connection connection, BiFunction<Row, RowMetadata, T> mappingFunction, SQLContext context) {
        Assert.notNull(mappingFunction, "rsh is null.");
        Assert.notNull(context, "context is null.");
        if (StringUtils.isBlank(context.getSql())) {
            logger.warn("select sql is null.");
            return Flux.empty();
        }

        LogContext log = OrmLog.commonLog(null, context.getSql(), context.getValues());

        BindMarkersFactory bmf = R2dbcUtils.getBindMarkersFactory(connection);
        List<BindMarker> bindMarkerList = new ArrayList<>();
        String innerSql = R2dbcUtils.parseSql(bmf, bindMarkerList, context.getSql());

        Statement statement = connection.createStatement(innerSql);

        R2dbcSql.fillSQLStatement(statement, bindMarkerList, context.getParas());
        OrmLog.sqlLog(null, log, () -> R2dbcUtils.getAutoCommit(connection));
        return Flux.from(statement.execute())
                .flatMap(r -> r.map(mappingFunction))
//                .doOnNext(OrmLog::resultOnly)
                .doOnComplete(() -> OrmLog.resultTime(null, log))
                .doOnCancel(() -> OrmLog.resultTime(null, log))
                .doOnError(throwable -> OrmLog.resultSqlLog(null, log, (Exception) throwable));
    }

    protected <T> Flux<T> select(Connection conn, R2dbcResultSetCallback<T> callback, SQLContext context) {
        Assert.notNull(callback, "rsh is null.");
        Assert.notNull(context, "context is null.");
        if (StringUtils.isBlank(context.getSql())) {
            logger.warn("select sql is null.");
            return Flux.empty();
        }

        LogContext log = OrmLog.commonLog(null, context.getSql(), context.getValues());

        BindMarkersFactory bmf = R2dbcUtils.getBindMarkersFactory(conn);
        List<BindMarker> bindMarkerList = new ArrayList<>();
        String innerSql = R2dbcUtils.parseSql(bmf, bindMarkerList, context.getSql());

        Statement statement = conn.createStatement(innerSql);
        R2dbcSql.fillSQLStatement(statement, bindMarkerList, context.getParas());

        OrmLog.sqlLog(null, log, () -> R2dbcUtils.getAutoCommit(conn));
        return callback.callback(Flux.from(statement.execute()))
//                .doOnNext(OrmLog::resultOnly)
                .doOnComplete(() -> OrmLog.resultTime(null, log))
                .doOnCancel(() -> OrmLog.resultTime(null, log))
                .doOnError(throwable -> OrmLog.resultLog(null, log, null));
    }

    @Override
    public <T extends DBObject> Mono<Integer> insert(Connection connection, T obj) {
        return insert(connection, obj, false, true);
    }

    @Override
    public <T extends DBObject> Mono<Integer> insert(Connection connection, T obj, boolean fast, boolean useOptimisticLock) {
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        SQLContext context = ModelSql.model2Save(null, dbContext, table, dialect, obj, useOptimisticLock);
        return batch(connection, dialect, table, context.getSql(), Collections.singletonList(context), Collections.singletonList(1),
                Collections.singletonList(obj), DMLType.INSERT, fast).last(0);
    }

    @Override
    public <T extends DBObject> Mono<Integer> update(Connection connection, T entity) {
        //不开启乐观锁校验
        return update(connection, entity, new SQLContext(), false);
    }

    /**
     * @param obj
     * @param context           不可为null
     * @param useOptimisticLock 是否使用乐观锁
     * @return 更新行数, 不报错
     */
    protected Mono<Integer> update(Connection connection, DBObject obj, SQLContext context, boolean useOptimisticLock) {
        DBDialect dialect = R2dbcUtils.getDBDialect(connection);
        DBContext dbContext = R2dbcUtils.getDBContext(connection);
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        EntityListenerManager.runPreUpdate(obj);
        ModelSql.forModelUpdate(dbContext, table, dialect, obj, context, useOptimisticLock, false);
        return execute(connection, context).doOnNext(s -> {
            obj.clearUpdate();
            EntityListenerManager.runPostUpdate(obj);
        });
    }

    @Override
    public <T extends DBObject> Mono<Integer> updateAndSet(Connection connection, T obj) {
        SQLContext context = new SQLContext();
        return update(connection, obj, context, true).flatMap(result -> setVersionValue(connection, result, context, obj));
    }

    private <T extends DBObject> Mono<Integer> setVersionValue(Connection connection, int result, SQLContext context, T obj) {
        if (result > 0) {
            TableMapping tm = MetaHolder.getMeta(obj.getClass());
            Map<DBField, ColumnMapping> versionMap = tm.getVersionMap();
            boolean dateVersion = false;
            for (Map.Entry<DBField, ColumnMapping> entry : versionMap.entrySet()) {
                if (Date.class.isAssignableFrom(entry.getValue().getClz())) {
                    dateVersion = true;
                    break;
                }
            }
            //更新成功,则变更对象的版本值
            if (dateVersion) {
                //对于日期,乐观锁最新的值需要从数据库获取.默认返回result的结果值
                return R2dbcOptimisticLock.setNewOptimisticLockValues(connection, obj).then(Mono.just(result));
            } else {
                List<SQLParameter> preResult = context.getPreResultParas();
                if (CollectionUtils.isNotEmpty(preResult)) {
                    for (SQLParameter v : preResult) {
                        OrmValueUtils.setValue(obj, v.getColumnMapping(), v.getValue());
                    }
                }
            }
        }
        return Mono.just(result);
    }

    @Override
    public <T extends DBObject> Mono<Integer> updateWithVersion(Connection conn, T obj) {
        return update(conn, obj, new SQLContext(), true);
    }

    @Override
    public Mono<Integer> merge(Connection connection, DBObject obj) {
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        List<ColumnMapping> cmList = table.getPkFields();
        DBField[] fields = new DBField[cmList.size()];
        boolean exist = true;
        for (int i = 0; i < cmList.size(); i++) {
            fields[i] = cmList.get(i).getField();
        }
        Object[] values = OrmUtils.getDataObjectValues(obj, fields);
        for (int i = 0; i < values.length; i++) {
            if (values[i] == null) {
                exist = false;
                break;
            }
        }
        return Mono.just(exist).flatMap(e -> {
            if (e) {
                return selectByPrimaryKeys(connection, obj.getClass(), values)
                        .flatMap(value -> Mono.just(Optional.of(value)))
                        .defaultIfEmpty(Optional.empty());
            } else {
                return Mono.just(Optional.empty());
            }
        }).flatMap(opt -> {//Optional中转结果值
            if (!opt.isPresent()) {
                return insert(connection, obj);
            } else {
                //去除相同值
                Map<DBField, Object> updateValueMap = obj.updateValueMap();
                for (Iterator<Map.Entry<DBField, Object>> it = updateValueMap.entrySet().iterator(); it.hasNext(); ) {
                    Map.Entry<DBField, Object> entry = it.next();
                    Object o = OrmValueUtils.getValue(opt.get(), table.getSchemaMap().get(entry.getKey()));
                    if (Objects.equals(o, entry.getValue())) {
                        it.remove();
                    }
                }
                //为空无需更新了.
                if (!updateValueMap.isEmpty()) {
                    return update(connection, obj);
                }
            }
            return Mono.just(0);
        });
    }

    @Override
    public <T extends DBObject> Mono<Integer> deleteByPrimaryKeys(Connection conn, Class<T> clz, Object... keyParams) {
        return deleteByPrimaryKeys(conn, false, clz, keyParams);
    }

    @Override
    public <T extends DBObject> Mono<Integer> delete(Connection connection, T entity) {
        return delete(connection, false, entity);
    }

    @Override
    public <T extends DBObject> Mono<Integer> logicDeleteByPrimaryKeys(Connection conn, Class<T> clz, Object... keyParams) {
        return deleteByPrimaryKeys(conn, true, clz, keyParams);
    }

    @Override
    public <T extends DBObject> Mono<Integer> deleteByPrimaryKeys(Connection conn, boolean logicDelete, Class<T> clz, Object... keyParams) {
        TableMapping table = MetaHolder.getMeta(clz);
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        SQLContext context = null;
        if (logicDelete && table.getLogicDeleteField() != null) {
            DBObject obj = OrmValueUtils.instance(table);
            CrudModelImpl.setLogicDeleteValue(obj, table);
            List<ColumnMapping> pKeys = table.getPkFields();
            for (int i = 0; i < pKeys.size(); i++) {
                obj.prepareUpdate(pKeys.get(i).getField(), keyParams[i]);
            }
            context = new SQLContext();
            ModelSql.forModelUpdate(dbContext, table, dialect, obj, context, false, false);
        } else {
            context = ModelSql.forModelDeleteByIds(dbContext, table, dialect, keyParams);
        }
        return execute(conn, context);
    }

    @Override
    public Mono<Integer> logicDelete(Connection conn, DBObject obj) {
        return delete(conn, true, obj);
    }

    @Override
    public Mono<Integer> delete(Connection conn, boolean logicDelete, DBObject obj) {
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        EntityListenerManager.runPreRemove(obj);

        SQLContext context = null;
        if (logicDelete && table.getLogicDeleteField() != null) {
            CrudModelImpl.setLogicDeleteValue(obj, table);
            context = new SQLContext();
            ModelSql.forModelUpdate(dbContext, table, dialect, obj, context, false, false);
        } else {
            context = ModelSql.forModelDelete(dbContext, table, dialect, obj);
        }
        return execute(conn, context).doOnNext(s -> EntityListenerManager.runPostRemove(obj));
    }

    @Override
    public Mono<Integer> execute(Connection conn, SQLContext context) {
        Assert.notNull(context, "context is null.");
        if (StringUtils.isBlank(context.getSql())) {
            logger.warn("execute sql is null.");
            return Mono.just(0);
        }
        LogContext log = OrmLog.commonLog(null, context.getSql(), context.getValues());
        if (StringUtils.isBlank(context.getSql())) {
            return Mono.just(0);
        }
        BindMarkersFactory bmf = R2dbcUtils.getBindMarkersFactory(conn);
        List<BindMarker> bindMarkerList = new ArrayList<>();
        String innerSql = R2dbcUtils.parseSql(bmf, bindMarkerList, context.getSql());

        Statement statement = conn.createStatement(innerSql);
        R2dbcSql.fillSQLStatement(statement, bindMarkerList, context.getParas());
//        OrmLog.sqlLog(null, log, () -> R2dbcUtils.getAutoCommit(conn));

        return Flux.from(statement.execute()).flatMap(Result::getRowsUpdated).cast(Number.class)
                .map(NumberUtils::numberToInt).last(0)
                .doOnNext(c -> {
                    OrmLog.resultSqlLog(null, log, c, () -> R2dbcUtils.getAutoCommit(conn));
                }).doOnError(throwable -> {
                    OrmLog.resultSqlLog(null, log, throwable);
                });
    }

    /**
     * Batch save models using the "insert into ..." sql generated by the model in modelList.
     */
    @Override
    public Flux<Integer> batchInsert(Connection conn, Collection<? extends DBObject> modelList, boolean insertFast, boolean useOptimisticLock) {
        if (CollectionUtils.isEmpty(modelList)) {
            return Flux.empty();
        }
        DBObject data = modelList.iterator().next();
        TableMapping table = MetaHolder.getMeta(data.getClass());
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        Flux<Integer> flux = Flux.empty();
        {
            Map<String, List<SQLContext>> map1 = new HashMap<>();
            Map<String, List<Integer>> map2 = new HashMap<>();
            Map<String, List<DBObject>> map3 = new HashMap<>();
            for (DBObject object : modelList) {
                SQLContext context = ModelSql.model2Save(null, dbContext, table, dialect, object, useOptimisticLock);
                List<SQLContext> list1 = map1.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list1.add(context);
                List<Integer> list2 = map2.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list2.add(1);
                List<DBObject> list3 = map3.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list3.add(object);
            }
            boolean first = true;
            for (Map.Entry<String, List<SQLContext>> entry : map1.entrySet()) {
                String sql = entry.getKey();
                List<SQLContext> list1 = entry.getValue();
                List<Integer> list2 = map2.get(sql);
                List<DBObject> list3 = map3.get(sql);
                Flux<Integer> tempFlux = batch(conn, dialect, table, sql, list1, list2, list3, DMLType.INSERT, insertFast);
                if (first) {
                    flux = tempFlux;
                } else {
                    flux = flux.mergeWith(tempFlux);
                }
                first = false;
            }
        }
        return flux;
    }

    /**
     * Batch update models using the attrs names of the model in
     * modelList.
     */
    @Override
    public Flux<Integer> batchUpdate(Connection conn, Collection<? extends DBObject> modelList) {
        if (CollectionUtils.isEmpty(modelList)) {
            return Flux.empty();
        }
        DBObject data = modelList.iterator().next();
        TableMapping table = MetaHolder.getMeta(data.getClass());
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        if (dialect.getProperty(DBProperty.NVL_FUNCTION) == null) {

        }
        Flux<Integer> flux = Flux.empty();
        {
            //save the context for every model,group by sql
            Map<String, List<SQLContext>> map1 = new HashMap<>();
            Map<String, List<Integer>> map2 = new HashMap<>();
            Map<String, List<DBObject>> map3 = new HashMap<>();
            for (DBObject object : modelList) {
                SQLContext context = ModelSql.forModelUpdate(dbContext, table, dialect, object, new SQLContext(), true, false);
                List<SQLContext> list1 = map1.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list1.add(context);
                List<Integer> list2 = map2.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list2.add(1);
                List<DBObject> list3 = map3.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list3.add(object);
            }
            boolean first = true;
            for (Map.Entry<String, List<SQLContext>> entry : map1.entrySet()) {
                String sql = entry.getKey();
                List<SQLContext> list1 = entry.getValue();
                List<Integer> list2 = map2.get(sql);
                List<DBObject> list3 = map3.get(sql);
                Flux<Integer> tempFlux = batch(conn, dialect, table, sql, list1, list2, list3, DMLType.UPDATE, false);
                if (first) {
                    flux = tempFlux;
                } else {
                    flux = flux.mergeWith(tempFlux);
                }
                first = false;
            }
        }
        return flux;
    }

    /**
     * Batch delete records using the columns names of the record in recordList.
     * @param conn
     * @param modelList the table name
     * @return
     */
    @Override
    public Flux<Integer> batchDelete(Connection conn, Collection<? extends DBObject> modelList) {
        if (CollectionUtils.isEmpty(modelList)) {
            return Flux.empty();
        }
        DBObject data = modelList.iterator().next();
        TableMapping table = MetaHolder.getMeta(data.getClass());
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        DBContext dbContext = R2dbcUtils.getDBContext(conn);
        Flux<Integer> flux = Flux.empty();
        {
            Map<String, List<SQLContext>> map1 = new HashMap<>();
            Map<String, List<Integer>> map2 = new HashMap<>();
            Map<String, List<DBObject>> map3 = new HashMap<>();
            for (DBObject object : modelList) {
                SQLContext context = ModelSql.forModelDelete(dbContext, table, dialect, object);
                List<SQLContext> list1 = map1.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list1.add(context);
                List<Integer> list2 = map2.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list2.add(1);
                List<DBObject> list3 = map3.computeIfAbsent(context.getSql(), k -> new LinkedList<>());
                list3.add(object);
            }
            boolean first = true;
            for (Map.Entry<String, List<SQLContext>> entry : map1.entrySet()) {
                String sql = entry.getKey();
                List<SQLContext> list1 = entry.getValue();
                List<Integer> list2 = map2.get(sql);
                List<DBObject> list3 = map3.get(sql);
                Flux<Integer> tempFlux = batch(conn, dialect, table, sql, list1, list2, list3, DMLType.DELETE, false);
                if (first) {
                    flux = tempFlux;
                } else {
                    flux = flux.mergeWith(tempFlux);
                }
                first = false;
            }
        }
        return flux;
    }

    /**
     * 批处理
     * @param connection 连接
     * @param dialect    方言
     * @param sql
     * @param list       集合数据
     * @param type       SQL类型
     * @param insertFast 快速插入标志
     * @return 执行结果
     */
    @Override
    public Flux<Integer> batch(Connection connection, DBDialect dialect, TableMapping table, String sql, List<SQLContext> list, List<Integer> dataSizeList, List<DBObject> dataList, DMLType type, boolean insertFast) {
        if (dialect == null) {
            dialect = R2dbcUtils.getDBDialect(connection);
        }

        if (list == null || list.size() == 0) {
            return Flux.empty();
        }
        if (StringUtils.isBlank(sql)) {
            return Flux.empty();
        }
        boolean isInsert = type == DMLType.INSERT;

        CrudModelImpl.doPreMethod(dataList, type);

        int size = list.size();
        boolean isSingle = size == 1;//是否为单个对象
        DBObject temp = dataList.get(0);
        List<ColumnMapping> pkFields = table.getPkFields();
        String[] pKeys = new String[pkFields.size()];
        if (CollectionUtils.isNotEmpty(pkFields)) {
            for (int i = 0; i < pkFields.size(); i++) {
                ColumnMapping cm = pkFields.get(i);
                pKeys[i] = cm.getRawColumnName();
            }
        }

        boolean returnKey = false;
        LogContext log = new LogContext();
        log.setStart(System.currentTimeMillis());
        log.setSql(sql);

        if (isInsert && !insertFast && !dialect.isNosql()) {
            if (CollectionUtils.isNotEmpty(pkFields)) {
                for (ColumnMapping pkCm : pkFields) {
                    if (pkCm.getGv() != null && (pkCm.getGv().strategy() == GenerationType.AUTO ||
                            pkCm.getGv().strategy() == GenerationType.SEQUENCE || pkCm.getGv().strategy() == GenerationType.IDENTITY) &&
                            JavaTypeUtils.isNumberClass(pkCm.getClz())) {
                        returnKey = true;
                        break;
                    }
                }
            }
        }

        BindMarkersFactory bmf = R2dbcUtils.getBindMarkersFactory(connection);
        List<BindMarker> bindMarkerList = new ArrayList<>();
        String innerSql = R2dbcUtils.parseSql(bmf, bindMarkerList, sql);

        Statement statement = connection.createStatement(innerSql);
        boolean first = true;
        int count = 1;
        for (SQLContext sqlContext : list) {
            int j = 0;
            //记录日志
            List<Object> values = new ArrayList<>();
            if (!first) {
                statement.add();
            }
            List<SQLParameter> columns = sqlContext.getParas();
            for (SQLParameter p : columns) {
                Object value = p.getValue();//此处不使用p.getValue()是为了兼容list列表
                ColumnMapping column = p.getColumnMapping();
                R2dbcSql.fillSQLStatement(statement, bindMarkerList.get(j++), value, column);
                values.add(value);
            }
            //批量处理
            OrmLog.batchCommonLog(null, log, sql, !isSingle, Integer.MAX_VALUE, count++, values);
            first = false;
        }
        if (OrmLog.needShowSql()) {
            log.setResult(new ArrayList<>(size == 0 ? 1 : size));
        }
        if (returnKey) {
            statement = statement.returnGeneratedValues(pKeys);
            Iterator<DBObject> it = dataList.iterator();
            return Flux.from(statement.execute()).flatMap(r -> r.map((row, metadata) -> {
                        if (it.hasNext()) {
                            DBObject obj = it.next();
                            for (ColumnMapping cm : pkFields) {
                                Object value = R2dbcConvertUtils.get(row.get(cm.getRawColumnName()), null, cm);
                                OrmValueUtils.setValue(obj, cm, value);
                            }
                            obj.clearUpdate();
                        }
                        return 1;
                    })).doOnNext(s -> {
                        CrudModelImpl.doPostMethod(dataList, type);
                    }).doOnNext(t -> {
                        if (log.getResult() != null) {
                            ((List<Integer>) log.getResult()).add(t);
                        }
                    })
                    .doOnComplete(() -> OrmLog.resultSqlLog(null, log, log.getResult(), () -> R2dbcUtils.getAutoCommit(connection), null, true))
                    .doOnCancel(() -> OrmLog.resultSqlLog(null, log, log.getResult(), () -> R2dbcUtils.getAutoCommit(connection), null, true))
                    .doOnError(throwable -> OrmLog.resultSqlLog(null, log, (Exception) throwable));
        } else {
            return Flux.from(statement.execute()).flatMap(Result::getRowsUpdated).cast(Number.class)
                    .map(NumberUtils::numberToInt)
                    .doOnNext(s -> {
                        CrudModelImpl.doPostMethod(dataList, type);
                    }).doOnNext(t -> {
                        if (log.getResult() != null) {
                            ((List<Integer>) log.getResult()).add(t);
                        }
                    })
                    .doOnComplete(() -> OrmLog.resultSqlLog(null, log, log.getResult(), () -> R2dbcUtils.getAutoCommit(connection), null, true))
                    .doOnCancel(() -> OrmLog.resultSqlLog(null, log, log.getResult(), () -> R2dbcUtils.getAutoCommit(connection), null, true))
                    .doOnError(throwable -> OrmLog.resultSqlLog(null, log, (Exception) throwable));
        }
    }

    @Override
    public <T extends DBObject> Flux<T> fetchCascade(Connection conn, T query, Class<T> clz, DBCascadeField... fields) {
        return selectList(conn, query).flatMap(t -> fetchLinks(conn, t, fields));
    }

    @Override
    public <T extends DBObject> Mono<T> fetchLinks(Connection conn, T obj) {
        Assert.notNull(obj, "Object cannot be null!");
        return fetchLinks(conn, obj, (String[]) null);
    }

    @Override
    public <T extends DBObject> Mono<T> fetchLinks(Connection conn, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        String[] fed = null;
        if (fields != null) {
            fed = new String[fields.length];
            for (int i = 0; i < fields.length; i++) {
                fed[i] = fields[i].name();
            }
        }
        return fetchLinks(conn, obj, fed);
    }

    @Override
    public <T extends DBObject> Mono<T> fetchLinks(Connection conn, T obj, String... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        Flux<Integer> flux = Flux.just(0);
        if (fields != null && fields.length > 0) {
            for (int i = 0; i < fields.length; i++) {
                String field = fields[i];
                ColumnMapping cm = table.getJavaFieldColumnMapping(field);
                if (cm != null && cm.isCascade()) {
                    flux = flux.mergeWith(fetchSubJoin(conn, obj, cm));
                }
            }
        } else {
            List<ColumnMapping> columnMappings = table.getMetaFields();
            for (ColumnMapping cm : columnMappings) {
                if (cm.isCascade()) {
                    flux = flux.mergeWith(fetchSubJoin(conn, obj, cm));
                }
            }
        }
        return flux.then(Mono.just(obj));
    }

    private <T extends DBObject> Mono<Integer> fetchSubJoin(Connection conn, T obj, ColumnMapping cm) {
        return fetchSubJoinLink(conn, obj, cm);
    }

    private Mono<Integer> fetchSubJoinLink(Connection conn, DBObject obj, ColumnMapping cm) {
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc == null) {
            MetaHolder.cascade(MetaHolder.getMeta(obj.getClass()), cm);
            cc = cm.getCascadeConfig();
        }
        CascadeContext select = cc.getSelectSubObject();
        Assert.notNull(select, "Cascade configuration information not found!");
        List<ColumnMapping> mainTableParas = select.getParas();
        SQLContext selectOrm = new SQLContext();
        selectOrm.setSql(select.getSqlByDialect(dialect));
        List<SQLParameter> paras = new ArrayList<>();
        for (ColumnMapping from : mainTableParas) {
            Object value = OrmValueUtils.getValue(obj, from);
            //有值为null,则直接返回,无需再查询.
            if (value == null) {
                return Mono.just(0);
            }
            paras.add(new SQLParameter(value, from));
        }
        selectOrm.setParas(paras);
        if (Collection.class.isAssignableFrom(cm.getClz())) {
            //集合
            List<Object> subObjs = new ArrayList<>();
            return selectList(conn, cc.getToTable().getThisType(), selectOrm).doOnNext(subObjs::add).then(Mono.create(sink -> {
                CascadeUtils.setCollectionValues(obj, cm, subObjs);
                sink.success(0);
            }));
        } else if (Map.class.isAssignableFrom(cm.getClz())) {
            List<Object> subObjs = new ArrayList<>();
            return selectList(conn, cc.getToTable().getThisType(), selectOrm).doOnNext(subObjs::add).then(Mono.create(sink -> {
                CascadeUtils.setMapValues(obj, cm, subObjs);
                sink.success(0);
            }));
        } else {
            //单一值
            //设置单一值
            return selectOne(conn, cc.getToTable().getThisType(), selectOrm).map(val -> {
                OrmValueUtils.setValue(obj, cm, val);
                return 0;
            }).defaultIfEmpty(0);
        }
    }

    @Override
    public Mono<Integer> insertCascade(Connection conn, DBObject obj, DBCascadeField... fields) {
        return insert(conn, obj).flatMap(count -> insertLinks(conn, obj, fields).then(Mono.just(count)));
    }

    @Override
    public <T extends DBObject> Mono<T> insertLinks(Connection con, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        Flux<Integer> flux = Flux.just(0);
        if (fields != null && fields.length > 0) {
            for (int i = 0; i < fields.length; i++) {
                DBCascadeField field = fields[i];
                ColumnMapping cm = table.getJavaFieldColumnMapping(field.name());
                if (cm != null && cm.isCascade()) {
                    flux = flux.mergeWith(insertLink(con, obj, cm));
                }
            }
        } else {
            List<ColumnMapping> columnMappings = table.getMetaFields();
            for (ColumnMapping cm : columnMappings) {
                if (cm.isCascade()) {
                    flux = flux.mergeWith(insertLink(con, obj, cm));
                }
            }
        }
        return flux.reduce(0, Integer::sum).then(Mono.just(obj));
    }

    private <T extends DBObject> Mono<Integer> insertLink(Connection con, T obj, ColumnMapping cm) {
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc != null) {
            //插入子对象
            return insertSubObject(con, obj, cm).defaultIfEmpty(0).flatMap(count -> {
                if (cc.getInsertRelation() != null) {
                    //插入关联关系表,对应多对多
                    return insertRelation(con, obj, cm.getCascadeField()).then(Mono.just(count));
                } else if (cc.getUpdateRelation() != null) {
                    Object subObject = OrmValueUtils.getValue(obj, cm);
                    return updateRelation(con, obj, cm, subObject).then(Mono.just(count));
                }
                return Mono.just(count);
            });
        }
        return Mono.just(0);
    }

    private Mono<Integer> insertSubObject(Connection conn, DBObject obj, ColumnMapping cm) {
        Object subObject = OrmValueUtils.getValue(obj, cm);
        CascadeConfig cc = cm.getCascadeConfig();
        Map<ColumnMapping, ColumnMapping> fromToColumns = null;
        if (cc != null && !cc.isUseMappedBy()) {
            fromToColumns = cc.getFromToColumns();
        }
        Flux<Integer> flux = Flux.just(0);
        if (subObject != null) {
            if (Collection.class.isAssignableFrom(subObject.getClass())) {
                Collection collection = (Collection) subObject;
                for (Object sub : collection) {
                    setSubJoinColumnValue(obj, fromToColumns, sub);
                    flux = flux.mergeWith(insert(conn, (DBObject) sub));
                }
            } else if (Map.class.isAssignableFrom(subObject.getClass())) {
                //不支持map
                logger.warn(" not support insert sub object by map");
            } else {
                setSubJoinColumnValue(obj, fromToColumns, subObject);
                flux = flux.mergeWith(insert(conn, (DBObject) subObject));
            }
        }
        return flux.reduce(0, Integer::sum);
    }

    /**
     * 设置子对象的关联字段值
     * @param obj
     * @param fromToColumns
     * @param sub
     */
    private void setSubJoinColumnValue(DBObject obj, Map<ColumnMapping, ColumnMapping> fromToColumns, Object sub) {
        if (fromToColumns != null) {
            for (Map.Entry<ColumnMapping, ColumnMapping> entry : fromToColumns.entrySet()) {
                Object joinColumnValue = OrmValueUtils.getValue(obj, entry.getKey());
                OrmValueUtils.setValue(sub, entry.getValue(), joinColumnValue);
            }
        }
    }

    @Override
    public <T extends DBObject> Mono<T> insertRelation(Connection con, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        Assert.notNull(fields, "fields is null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        Flux<Integer> flux = Flux.just(0);
        for (int i = 0; i < fields.length; i++) {
            DBCascadeField field = fields[i];
            ColumnMapping cm = table.getJavaFieldColumnMapping(field.name());
            if (cm != null && cm.isCascade()) {
                flux = flux.mergeWith(insertRelation(con, obj, cm));
            } else {
                throw new RuntimeException("field not found!");
            }
        }
        return flux.then(Mono.just(obj));
    }

    private Mono<Integer> insertRelation(Connection conn, DBObject obj, ColumnMapping cm) {
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        int count = 0;
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc == null) {
            MetaHolder.cascade(MetaHolder.getMeta(obj.getClass()), cm);
            cc = cm.getCascadeConfig();
        }
        Object subObject = OrmValueUtils.getValue(obj, cm);
        //为空或者不是集合对象
        if (subObject == null || !Collection.class.isAssignableFrom(subObject.getClass())) {
            return Mono.just(count);
        }
        Collection<?> subObjects = (Collection<?>) subObject;
        CascadeContext insert = cc.getInsertRelation();
        String sql = insert.getSqlByDialect(dialect);
        List<ColumnMapping> mainTableParas = insert.getParas();
        //设置关系表的值,几个子对象,设置几个子对象的关系.
        List<Object[]> relationsValues = new ArrayList<>();
        for (int i = 0; i < subObjects.size(); i++) {
            Object[] relationValues = new Object[cc.getMiddleTableColumns().size()];
            relationsValues.add(relationValues);
        }
        int i = 0;
        for (ColumnMapping column : mainTableParas) {
            if (column.getMeta().getThisType() == cc.getFromTable().getThisType()) {
                Object value = OrmValueUtils.getValue(obj, column);
                for (int j = 0; j < subObjects.size(); j++) {
                    relationsValues.get(j)[i] = value;
                }
            } else if (column.getMeta().getThisType() == cc.getToTable().getThisType()) {
                int k = 0;
                for (Object sub : subObjects) {
                    Object subValue = OrmValueUtils.getValue(sub, column);
                    relationsValues.get(k)[i] = subValue;
                    k++;
                }
            }
            i++;
        }
        if (!relationsValues.isEmpty()) {
            return R2dbcCrud.getInstance().getCrudSql().executeBatch(conn, sql, relationsValues).reduce(0, Integer::sum);
        } else {
            logger.warn("insert sub object relation is empty");
        }
        return Mono.just(count);
    }

    @Override
    public Mono<Integer> updateCascade(Connection conn, DBObject obj, DBCascadeField... fields) {
        updateLinks(conn, obj, fields);
        return update(conn, obj);
    }

    @Override
    public <T extends DBObject> Mono<Integer> updateLinks(Connection con, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        List<ColumnMapping> columnMappings = table.getMetaFields();
        Flux<Integer> flux = Flux.just(0);
        if (fields != null && fields.length > 0) {
            for (int i = 0; i < fields.length; i++) {
                DBCascadeField field = fields[i];
                ColumnMapping cm = table.getJavaFieldColumnMapping(field.name());
                if (cm != null && cm.isCascade()) {
                    flux = flux.mergeWith(updateLink(con, obj, cm));
                }
            }
        } else {
            for (ColumnMapping cm : columnMappings) {
                if (cm.isCascade()) {
                    flux = flux.mergeWith(updateLink(con, obj, cm));
                }
            }
        }
        return flux.reduce(0, Integer::sum);
    }

    private <T extends DBObject> Mono<Integer> updateLink(Connection con, T obj, ColumnMapping cm) {
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc != null) {
            //更新子对象
            return updateSubObject(con, obj, cm).defaultIfEmpty(0).flatMap(count -> {
                //可以判断是JoinTable关系
                if (cc.getInsertRelation() != null) {
                    //更新关联关系
                    return updateRelation(con, obj, cm.getCascadeField());
                }
                return Mono.just(count);
            });
        }
        return Mono.just(0);
    }

    /**
     * @param conn
     * @param obj
     * @param cm
     * @return 更新对象
     */
    private Mono<Integer> updateSubObject(Connection conn, DBObject obj, ColumnMapping cm) {
        Object subObject = OrmValueUtils.getValue(obj, cm);
        CascadeConfig cc = cm.getCascadeConfig();
        Map<ColumnMapping, ColumnMapping> fromToColumns = null;
        if (cc != null && !cc.isUseMappedBy()) {
            fromToColumns = cc.getFromToColumns();
        }
        Flux<Integer> flux = Flux.just(0);
        if (subObject != null) {
            if (Collection.class.isAssignableFrom(subObject.getClass())) {
                Collection<?> collection = (Collection<?>) subObject;
                for (Object sub : collection) {
                    setSubJoinColumnValue(obj, fromToColumns, sub);
                    DBObject subObj = (DBObject) sub;
                    if (!subObj.updateValueMap().isEmpty()) {
                        flux = flux.mergeWith(update(conn, subObj));
                    }
                }
            } else if (Map.class.isAssignableFrom(subObject.getClass())) {
                //不支持map
                logger.warn(" not support update sub object by map");
            } else {
                setSubJoinColumnValue(obj, fromToColumns, subObject);
                DBObject subObj = (DBObject) subObject;
                if (!subObj.updateValueMap().isEmpty()) {
                    flux = flux.mergeWith(update(conn, subObj));
                }
            }
        }
        return flux.reduce(0, Integer::sum);
    }

    @Override
    public <T extends DBObject> Mono<Integer> updateRelation(Connection con, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        Assert.notNull(fields, "Field cannot be null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        Flux<Integer> flux = Flux.just(0);
        for (int i = 0; i < fields.length; i++) {
            DBCascadeField field = fields[i];
            ColumnMapping cm = table.getJavaFieldColumnMapping(field.name());
            if (cm != null && cm.isCascade()) {
                Object subObject = OrmValueUtils.getValue(obj, cm);
                flux = flux.mergeWith(updateRelation(con, obj, cm, subObject));
            } else {
                throw new RuntimeException("field not found!");
            }
        }
        return flux.reduce(0, Integer::sum);
    }

    /**
     * @param con
     * @param obj       主对象
     * @param cm        obj的级联字段
     * @param subObject 子对象
     * @param <T>
     * @return 更新的关系数量
     */
    private <T extends DBObject> Mono<Integer> updateRelation(Connection con, T obj, ColumnMapping cm, Object subObject) {
        Assert.notNull(obj, "Object cannot be null!");
        Assert.notNull(cm, "Field cannot be null!");
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc == null) {
            MetaHolder.cascade(MetaHolder.getMeta(obj.getClass()), cm);
            cc = cm.getCascadeConfig();
        }
        DBDialect dialect = R2dbcUtils.getDBDialect(con);
        if (cc.getType() == CascadeConfig.LinkType.JoinColumns) {
            CascadeContext context = cc.getUpdateRelation();
            if (context != null) {
                List<Object[]> relationValues = new ArrayList<>();
                if (Collection.class.isAssignableFrom(subObject.getClass())) {
                    Collection subObjects = (Collection) subObject;
                    for (Object sub : subObjects) {
                        List<Object> rv = new ArrayList<>();
                        for (ColumnMapping co : context.getParas()) {
                            if (co.getMeta().getThisType() == obj.getClass()) {
                                rv.add(OrmValueUtils.getValue(obj, co));
                            } else if (co.getMeta().getThisType() == sub.getClass()) {
                                rv.add(OrmValueUtils.getValue(sub, co));
                            }
                        }
                        relationValues.add(rv.toArray());
                    }
                } else if (Map.class.isAssignableFrom(subObject.getClass())) {
                    //不支持map
                    logger.warn(" not support update sub object by map");
                } else {
                    List<Object> rv = new ArrayList<>();
                    for (ColumnMapping co : context.getParas()) {
                        if (co.getMeta().getThisType() == obj.getClass()) {
                            rv.add(OrmValueUtils.getValue(obj, co));
                        } else if (co.getMeta().getThisType() == subObject.getClass()) {
                            rv.add(OrmValueUtils.getValue(subObject, co));
                        }
                    }
                    relationValues.add(rv.toArray());
                }
                if (!relationValues.isEmpty()) {
                    return R2dbcCrud.getInstance().getCrudSql().executeBatch(con, context.getSqlByDialect(dialect), relationValues).then(Mono.just(0));
                } else {
                    logger.warn("update sub object relation is empty");
                }
            }
        } else if (cc.getType() == CascadeConfig.LinkType.JoinTable) {
            if (subObject != null) {
                return deleteRelation(con, obj, cm).defaultIfEmpty(0).flatMap(c -> insertRelation(con, obj, cm));
            }
        }
        return Mono.just(0);
    }

    @Override
    public <T extends DBObject> Mono<Integer> deleteCascade(Connection conn, T obj, DBCascadeField... fields) {
        return deleteLinks(conn, obj, fields).defaultIfEmpty(0).flatMap(c -> delete(conn, obj));
    }

    @Override
    public <T extends DBObject> Mono<Integer> deleteLinks(Connection conn, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        List<ColumnMapping> columnMappings = table.getMetaFields();
        Flux<Integer> flux = Flux.just(0);
        if (fields != null && fields.length > 0) {
            for (int i = 0; i < fields.length; i++) {
                DBCascadeField field = fields[i];
                ColumnMapping cm = table.getJavaFieldColumnMapping(field.name());
                if (cm != null && cm.isCascade()) {
                    flux = flux.mergeWith(deleteLink(conn, obj, cm));
                }
            }
        } else {
            for (ColumnMapping cm : columnMappings) {
                if (cm.isCascade()) {
                    flux = flux.mergeWith(deleteLink(conn, obj, cm));
                }
            }
        }
        return flux.reduce(0, Integer::sum);
    }

    private <T extends DBObject> Mono<Integer> deleteLink(Connection con, T obj, ColumnMapping cm) {
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc != null && cc.getDeleteSubObject() != null) {
            //删除子对象
            return deleteSubObject(con, obj, cm).defaultIfEmpty(0).flatMap(count -> {
                if (cc.getDeleteRelation() != null) {
                    //删除关联关系
                    return deleteRelation(con, obj, cm).then(Mono.just(count));
                }
                return Mono.just(count);
            });
        }
        return Mono.just(0);
    }

    private Mono<Integer> deleteSubObject(Connection conn, DBObject obj, ColumnMapping cm) {
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc == null) {
            MetaHolder.cascade(MetaHolder.getMeta(obj.getClass()), cm);
            cc = cm.getCascadeConfig();
        }
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        CascadeContext delete = cc.getDeleteSubObject();
        String sql = delete.getSqlByDialect(dialect);
        List<ColumnMapping> mainTableParas = delete.getParas();
        //设置主表的值
        List<Object> relationValues = new ArrayList<>();
        for (ColumnMapping column : mainTableParas) {
            Object value = OrmValueUtils.getValue(obj, column);
            relationValues.add(value);
        }
        return R2dbcCrud.getInstance().getCrudSql().execute(conn, sql, relationValues.toArray());
    }

    @Override
    public <T extends DBObject> Mono<Integer> deleteRelation(Connection con, T obj, DBCascadeField... fields) {
        Assert.notNull(obj, "Object cannot be null!");
        Assert.notNull(fields, "Field cannot be null!");
        TableMapping table = MetaHolder.getMeta(obj.getClass());
        Flux<Integer> flux = Flux.just(0);
        for (int i = 0; i < fields.length; i++) {
            DBCascadeField field = fields[i];
            ColumnMapping cm = table.getJavaFieldColumnMapping(field.name());
            if (cm != null && cm.isCascade()) {
                flux = flux.mergeWith(deleteRelation(con, obj, cm));
            } else {
                throw new RuntimeException("field not found!");
            }
        }
        return flux.reduce(0, Integer::sum);
    }

    private Mono<Integer> deleteRelation(Connection conn, DBObject obj, ColumnMapping cm) {
        CascadeConfig cc = cm.getCascadeConfig();
        if (cc == null) {
            MetaHolder.cascade(MetaHolder.getMeta(obj.getClass()), cm);
            cc = cm.getCascadeConfig();
        }
        CascadeContext delete = cc.getDeleteRelation();
        DBDialect dialect = R2dbcUtils.getDBDialect(conn);
        String sql = delete.getSqlByDialect(dialect);
        List<ColumnMapping> mainTableParas = delete.getParas();
        //设置主表的值
        List<Object> relationValues = new ArrayList<>();
        for (ColumnMapping column : mainTableParas) {
            Object value = OrmValueUtils.getValue(obj, column);
            relationValues.add(value);
        }
        return R2dbcCrud.getInstance().getCrudSql().execute(conn, sql, relationValues.toArray());
    }
}
